package com.atguigu.chapter06;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 * @date 2021/3/5 16:10
 */
public class Flink08_AdClickAnalysis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1.读取数据
        SingleOutputStreamOperator<AdsClickLog> adClickDS = env
                .readTextFile("input/AdClickLog.csv")
                .map(ad -> {
                    String[] datas = ad.split(",");
                    return new AdsClickLog(
                            Long.valueOf(datas[0]),
                            Long.valueOf(datas[1]),
                            datas[2],
                            datas[3],
                            Long.valueOf(datas[4])
                    );
                });

        // 2.处理数据： 各省份 各广告 的点击量
        // 按照 统计维度 分组： 省份、广告
        adClickDS
//                .keyBy(r -> Tuple2.of(r.getProvince(), r.getAdId()))
                .map(data -> Tuple2.of(data.getProvince() + "_" + data.getAdId(), 1))
                .returns(Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(r -> r.f0)
                .sum(1)
                .print();


        env.execute();
    }

}
